import json
import re
from typing import Any

from bfcl_eval.model_handler.local_inference.base_oss_handler import OSSHandler
from bfcl_eval.model_handler.utils import (
    convert_to_function_call,
    func_doc_language_specific_pre_processing,
)
from overrides import override


class Granite3FCHandler(OSSHandler):
    """
    Handler for the Granite-3.x series of models.

    Currently supports:
    - Granite-3.1-8B-Instruct (https://huggingface.co/ibm-granite/granite-3.1-8b-instruct)
    - Granite-3.2-8B-Instruct (https://huggingface.co/ibm-granite/granite-3.2-8b-instruct)
    """

    def __init__(self, model_name, temperature) -> None:
        super().__init__(model_name, temperature)
        self.model_name_huggingface = model_name.replace("-FC", "")

    # copied from phi_fc.py
    @override
    def _pre_query_processing_prompting(self, test_entry: dict) -> dict:
        functions: list = test_entry["function"]
        test_category: str = test_entry["id"].rsplit("_", 1)[0]

        functions = func_doc_language_specific_pre_processing(functions, test_category)

        # Granite use its own system prompt

        return {"message": [], "function": functions}

    # copied from phi_fc.py
    @override
    def _parse_query_response_prompting(self, api_response: any) -> dict:
        """
        Parses the raw response from the model API to extract the result, input token count, and output token count.

        Args:
            api_response (any): The raw response from the model API.

        Returns:
            A dict containing the following elements:
                - model_responses (any): The parsed result that can be directly used as input to the decode method.
                - input_token (int): The number of tokens used in the input to the model.
                - output_token (int): The number of tokens generated by the model as output.
                - Any other metadata that is specific to the model.
        """
        model_responses_message_for_chat_history = api_response.choices[0].text
        model_responses = api_response.choices[0].text
        extracted_tool_calls = self._extract_tool_calls(model_responses)

        if (
            self._is_tool_call_response_format(extracted_tool_calls)
            and len(extracted_tool_calls) > 0
        ):
            model_responses = [
                {item["name"]: item["arguments"]} for item in extracted_tool_calls
            ]

        return {
            "model_responses": model_responses,
            "model_responses_message_for_chat_history": model_responses_message_for_chat_history,
            "input_token": api_response.usage.prompt_tokens,
            "output_token": api_response.usage.completion_tokens,
        }

    # copied from phi_fc.py
    @override
    def _add_assistant_message_prompting(
        self, inference_data: dict, model_response_data: dict
    ) -> dict:
        """
        Add assistant message to the chat history.
        """
        inference_data["message"].append(
            {
                "role": "assistant",
                "content": model_response_data["model_responses_message_for_chat_history"],
            }
        )
        return inference_data

    # copied over from phi_fc.py
    @staticmethod
    def _extract_tool_calls(input_string: str):
        """
        Given an input_string, parse out all of the tool calls made in the body.
        """
        input_string = input_string.lstrip("ASSISTANT:")
        input_string = input_string.lstrip()

        if "<tool_call>" in input_string:
            # granite-3.x will sometimes output a </tool_call> token at the very end
            input_string = input_string.replace("</tool_call>", "")
            pattern = r"<tool_call>(.*)"
        else:
            pattern = r"<\|tool_call\|>(.*)"

        matches = re.findall(pattern, input_string, re.DOTALL)
        # process matches into a list of dictionaries
        result = []
        for match in matches:
            # quickly strip the ends of whitespace
            match = match.strip()

            # if the match wasn't a list then make it a list
            if not match.startswith("[") and not match.endswith("]"):
                match = "[" + match + "]"

            try:
                match = json.loads(match)
            except json.JSONDecodeError:
                pass

            if type(match) is list:
                for item in match:
                    # Handle the situation: ['{"name": "random_forest.train", "arguments": {"n_estimators": 100, "max_depth": 5, "data": my_data}}']
                    if type(item) == str:
                        item = eval(item)
                    result.append(item)
            else:
                result.append(match)

        return result

    @override
    def decode_ast(self, result, language="Python"):
        # The input is already a list of dictionaries, so no need to decode
        # `[{func1:{param1:val1,...}},{func2:{param2:val2,...}}]`
        if type(result) != list or any(type(item) != dict for item in result):
            return []
        return result

    @override
    def decode_execute(self, result):
        if type(result) != list or any(type(item) != dict for item in result):
            return []
        return convert_to_function_call(result)

    # copied from phi_fc.py
    @staticmethod
    def _is_tool_call_response_format(input: Any) -> bool:
        """
        This is a helper method to detect if the tool call extracted by `_extract_tool_calls` is actually a tool call.
        This is important because the model might return a dictionary that looks like a tool call, but is not. It sometimes returns the function document.
        For example,
        "<|tool_call|>{\"name\": \"streaming_services.shows_list_and_ratings\", \"arguments\": {\"streaming_service\": \"Netflix\", \"show_list\": [\"Friends\"], \"sort_by_rating\": false}, \"type\": \"dict\", \"description\": \"Get a list of shows and their ratings on Netflix.\"}"
        The dictionary will cause the downstream decoding pipeline error, so it's better to do a sanity check here.
        """
        if type(input) is not list:
            return False

        for item in input:
            if type(item) is not dict:
                return False
            if "name" not in item:
                return False
            if "arguments" not in item:
                return False
            if len(item) != 2:
                return False

        return True

    @override
    def _format_prompt(self, messages, function):
        """
        {%- if messages[0]['role'] == 'system' %}
            {%- set system_message = messages[0]['content'] %}
            {%- set loop_messages = messages[1:] %}
        {%- else %}
            {%- set system_message = "Knowledge Cutoff Date: April 2024.
        Today's Date: " + strftime_now('%B %d, %Y') + ".
        You are Granite, developed by IBM." %}
            {%- if tools and documents %}
                {%- set system_message = system_message + " You are a helpful AI assistant with access to the following tools. When a tool is required to answer the user's query, respond with <|tool_call|> followed by a JSON list of tools used. If a tool does not exist in the provided list of tools, notify the user that you do not have the ability to fulfill the request.

        Write the response to the user's input by strictly aligning with the facts in the provided documents. If the information needed to answer the question is not available in the documents, inform the user that the question cannot be answered based on the available data." %}
            {%- elif tools %}
                {%- set system_message = system_message + " You are a helpful AI assistant with access to the following tools. When a tool is required to answer the user's query, respond with <|tool_call|> followed by a JSON list of tools used. If a tool does not exist in the provided list of tools, notify the user that you do not have the ability to fulfill the request." %}
            {%- elif documents %}
                {%- set system_message = system_message + " Write the response to the user's input by strictly aligning with the facts in the provided documents. If the information needed to answer the question is not available in the documents, inform the user that the question cannot be answered based on the available data." %}
            {%- elif thinking %}
            {%- set system_message = system_message + " You are a helpful AI assistant.
        Respond to every user query in a comprehensive and detailed way. You can write down your thoughts and reasoning process before responding. In the thought process, engage in a comprehensive cycle of analysis, summarization, exploration, reassessment, reflection, backtracing, and iteration to develop well-considered thinking process. In the response section, based on various attempts, explorations, and reflections from the thoughts section, systematically present the final solution that you deem correct. The response should summarize the thought process. Write your thoughts after 'Here is my thought process:' and write your response after 'Here is my response:' for each user query." %}
            {%- else %}
                {%- set system_message = system_message + " You are a helpful AI assistant." %}
            {%- endif %}
            {%- if 'citations' in controls and documents %}
                {%- set system_message = system_message + '

        In your response, use the symbols <co> and </co> to indicate when a fact comes from a document in the search result, e.g <co>0</co> for a fact from document 0. Afterwards, list all the citations with their corresponding documents in an ordered list.' %}
            {%- endif %}
            {%- if 'hallucinations' in controls and documents %}
                {%- set system_message = system_message + '

        Finally, after the response is written, include a numbered list of sentences from the response that are potentially hallucinated and not based in the documents.' %}
            {%- endif %}
            {%- set loop_messages = messages %}
        {%- endif %}
        {{- '<|start_of_role|>system<|end_of_role|>' + system_message + '<|end_of_text|>
        ' }}
        {%- if tools %}
            {{- '<|start_of_role|>tools<|end_of_role|>' }}
            {{- tools | tojson(indent=4) }}
            {{- '<|end_of_text|>
        ' }}
        {%- endif %}
        {%- if documents %}
            {{- '<|start_of_role|>documents<|end_of_role|>' }}
            {%- for document in documents %}
                {{- 'Document ' + loop.index0 | string + '
        ' }}
                {{- document['text'] }}
                {%- if not loop.last %}
                    {{- '

        '}}
                {%- endif%}
            {%- endfor %}
            {{- '<|end_of_text|>
        ' }}
        {%- endif %}
        {%- for message in loop_messages %}
            {{- '<|start_of_role|>' + message['role'] + '<|end_of_role|>' + message['content'] + '<|end_of_text|>
        ' }}
            {%- if loop.last and add_generation_prompt %}
                {{- '<|start_of_role|>assistant' }}
                    {%- if controls %}
                        {{- ' ' + controls | tojson()}}
                    {%- endif %}
                {{- '<|end_of_role|>' }}
            {%- endif %}
        {%- endfor %}
        """
        # formatted input prompt
        formatted_prompt = ""

        if messages[0]["role"] == "system":
            # set the system message & shift the remaining right
            system_prompt = messages[0]["content"]
            messages = messages[1:]
        else:
            # granite-3.x only provides instructions on how to create a <|tool_call|>
            # when there is no system message passed in. I think you can get better performance by always
            # giving these instructions, but this design deviates from how the model is realistically used
            system_prompt = (
                "Knowledge Cutoff Date: April 2024.\n"
                "Today's Date: April 29, 2025.\n"
                "You are Granite, developed by IBM."
            )
            if function:
                # we only add this when the tools are provided AND no system prompt is passed
                system_prompt += (
                    " You are a helpful AI assistant with access "
                    "to the following tools. When a tool is required to answer the user's query, respond "
                    "with <|tool_call|> followed by a JSON list of tools used. If a tool does not exist "
                    "in the provided list of tools, notify the user that you do not have the ability to fulfill the request."
                )

        # add the system message
        formatted_prompt += (
            f"<|start_of_role|>system<|end_of_role|>{system_prompt}<|end_of_text|>\n"
        )

        # now we format the messages
        if function:
            formatted_prompt += (
                "<|start_of_role|>tools<|end_of_role|>"
                + json.dumps(function, indent=4)
                + "<|end_of_text|>\n"
            )

        # Granite doesn't have any remaining special cases for tool calling on any of the other messages
        for msg in messages:
            formatted_prompt += (
                "<|start_of_role|>"
                + msg["role"]
                + "<|end_of_role|>"
                + msg["content"]
                + "<|end_of_text|>\n"
            )

        return formatted_prompt
